// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using System;
using System.Threading;
using KurrentDB.Core.Bus;
using KurrentDB.Core.DataStructures;
using KurrentDB.Core.Messages;
using KurrentDB.Core.Services.Transport.Http.Messages;
using KurrentDB.Transport.Http.EntityManagement;
using HttpStatusCode = KurrentDB.Transport.Http.HttpStatusCode;
using ILogger = Serilog.ILogger;

namespace KurrentDB.Core.Services.Transport.Http;

// Called by worker bus concurrently. Thread Safe.
internal class AuthenticatedHttpRequestProcessor : IHandle<HttpMessage.PurgeTimedOutRequests>, IHandle<AuthenticatedHttpRequestMessage> {
	private static readonly ILogger Log = Serilog.Log.ForContext<AuthenticatedHttpRequestProcessor>();

	private readonly PairingHeap<Tuple<DateTime, HttpEntityManager>> _pending = new((x, y) => x.Item1 < y.Item1);

	public void Handle(HttpMessage.PurgeTimedOutRequests message) {
		PurgeTimedOutRequests();
	}

	private void PurgeTimedOutRequests() {
		if (!Monitor.TryEnter(_pending))
			return; // no need to wait, we can purge next time
		try {
			while (_pending.Count > 0) {
				var req = _pending.FindMin();
				if (req.Item1 <= DateTime.UtcNow || req.Item2.IsProcessing) {
					req = _pending.DeleteMin();
					req.Item2.ReplyStatus(HttpStatusCode.RequestTimeout,
						"Server was unable to handle request in time",
						e => Log.Debug(
							"Error occurred while closing timed out connection (HTTP service core): {e}.",
							e.Message));
				} else
					break;
			}
		} catch (Exception exc) {
			Log.Error(exc, "Error purging timed out requests in HTTP request processor.");
		} finally {
			Monitor.Exit(_pending);
		}
	}

	public void Handle(AuthenticatedHttpRequestMessage message) {
		var manager = message.Manager;
		var match = message.Match;
		try {
			var reqParams = match.RequestHandler(manager, match.TemplateMatch);
			if (!reqParams.IsDone)
				lock (_pending)
					_pending.Add(Tuple.Create(DateTime.UtcNow + reqParams.Timeout, manager));
		} catch (Exception exc) {
			Log.Error(exc, "Error while handling HTTP request '{url}'.", manager.HttpEntity.Request.Url);
			InternalServerError(manager);
		}

		PurgeTimedOutRequests();
	}

	private static void InternalServerError(HttpEntityManager entity) {
		entity.ReplyStatus(HttpStatusCode.InternalServerError, "Internal Server Error",
			e => Log.Debug("Error while closing HTTP connection (HTTP service core): {e}.", e.Message));
	}
}
